package com.nx.platform.es.biz.esspider.sink;

import com.google.common.base.Preconditions;
import com.nx.platform.es.common.utils.MoreMaps;
import com.nx.platform.es.service.ESClientManager;
import org.jetbrains.annotations.NotNull;

import java.lang.reflect.Constructor;
import java.util.Map;

import static com.nx.platform.es.common.utils.Constants.SINKS_CLASS_NAME;
import static com.nx.platform.es.common.utils.Constants.SINKS_ID;

/**
 * Sink工厂
 *
 * @author
 * @since 2017年3月24日
 */
public class SinkFactory {

    /**
     * 创建Sink
     *
     * @param settings sink settings
     * @return
     * @throws Exception
     */
    @NotNull
    public static Sink createSink(ESClientManager esManager, Map<?, ?> settings) throws Exception {
        //
        String simpleClassName = MoreMaps.getString(settings, SINKS_CLASS_NAME);
        String className = Sink.class.getPackage().getName() + "." + simpleClassName;
        Class<?> clazz = Class.forName(className);
        //
        SinkDefine define = clazz.getAnnotation(SinkDefine.class);
        String defineErrorMsg = "sink type not found, class: " + className;
        Preconditions.checkNotNull(define, defineErrorMsg);
        //
        SinkType type = define.value();
        String typeErrorMsg = "sink type is null, class: " + className;
        Preconditions.checkNotNull(type, typeErrorMsg);
        //
        switch (type) {
        case ES:
            Constructor<?> dc = clazz.getConstructor(String.class, ESClientManager.class);
            String dcErrorMsg = "sink constructor(String) not found, class: " + className;
            Preconditions.checkNotNull(dc, dcErrorMsg);
            String esId = MoreMaps.getString(settings, SINKS_ID);
            Sink esSink = (Sink) dc.newInstance(esId, esManager);
            esSink.init(settings);
            return esSink;
        case Simple:
            Constructor<?> sc = clazz.getConstructor(String.class);
            String scErrorMsg = "sink constructor(String) not found, class: " + className;
            Preconditions.checkNotNull(sc, scErrorMsg);
            String simpleId = MoreMaps.getString(settings, SINKS_ID);
            Sink simpleSink = (Sink) sc.newInstance(simpleId);
            simpleSink.init(settings);
            return simpleSink;
        }
        //
        String errorMsg = "sink type no support, class: " + className;
        throw new IllegalArgumentException(errorMsg);
    }

}
